-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-732][SPARK-3628][CORE][RESUBMIT] eliminate duplicate update on accmulator #2524
Conversation
@mateiz @mridulm @kayousterhout @markhamstra @pwendell @JoshRosen I proposed this as an resubmission of #228 Expecting your review |
Test FAILed. |
be10b9b
to
13a190e
Compare
QA tests have started for PR 2524 at commit
|
QA tests have finished for PR 2524 at commit
|
Test FAILed. |
13a190e
to
9fbe39a
Compare
QA tests have started for PR 2524 at commit
|
QA tests have finished for PR 2524 at commit
|
Test FAILed. |
OK...I will make MIMA happy..... |
Test FAILed. |
0ef91fc
to
af7ff02
Compare
QA tests have started for PR 2524 at commit
|
QA tests have finished for PR 2524 at commit
|
Test PASSed. |
BTW, if we don't want to de-duplicate in shuffle stages, we can just move the necessary part to TaskSetManager |
Let's not de-duplicate in shuffle stages please. That complicates the patch a lot and I'm not sure why people would necessarily use it. Also, why did you add a duplicate flag to Accumulator? IMO we shouldn't expose this as an option. Again it adds complexity in what should just be a bug fix. |
Basically it would be great to get a really simple patch that only fixes SPARK-3628 and adds no new data structures in DAGScheduler. |
the drawbacks for us not to de-duplicate in shuffle stage is that, it makes accumulator usage to be very tricky... it sounds like you are not encouraged to use accumulator in a transformation, especially when the involved stage is shared by multiple jobs or your cluster is not that stable.... for adding flag, just provide flexibility for the user to choose whether they would like to accept duplicate update.... |
I can simply monitor the accumulator update in TaskSetManager, just not sure if that can maximumly resolve the problem..... |
It's probably easiest to move the accumulator update to TaskSetManager or to the part of DAGScheduler that reports the result to the user. It's right below the current update in the code:
That happens only once per task, so it's a good place to do the update for ResultTask. For ShuffleMapTask you can do it in the corresponding match statement as well. |
@@ -112,6 +112,10 @@ class DAGScheduler( | |||
// stray messages to detect. | |||
private val failedEpoch = new HashMap[String, Long] | |||
|
|||
// stageId => (SplitId -> (accumulatorId, accumulatorValue)) | |||
private[scheduler] val stageIdToAccumulators = new HashMap[Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may cause a memory leak?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How?
I think it should work...I'm trying this |
QA tests have started for PR 2524 at commit
|
QA tests have finished for PR 2524 at commit
|
Test PASSed. |
OK, Jenkins said OK Finished the modification,
|
@@ -901,6 +900,33 @@ class DAGScheduler( | |||
} | |||
} | |||
|
|||
private def updateAccumulator(event: CompletionEvent): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call this updateAccumulators and add a comment saying
/** Merge updates from a task to our local accumulator values */
@CodingCat thanks for the update, this looks good. I just made a few small comments. |
Test build #23893 has started for PR 2524 at commit
|
Test build #23893 has finished for PR 2524 at commit
|
Test FAILed. |
Hey, @mateiz , thank you very much for the review, I addressed all of them except the "lastId" one, as MIMA wants me to keep that since it's public..... also, a question for you, shall I submit the patch to the old version branches, since there are some merge conflicts preventing the patch directly to there |
Test build #23895 has started for PR 2524 at commit
|
Test build #23895 has finished for PR 2524 at commit
|
Test PASSed. |
Can you just not change Accumulator.scala then? That change isn't fixing any kind of bug, it's just a small optimization. Just remove it from this patch. |
Test build #23908 has started for PR 2524 at commit
|
@mateiz sure, just rollback the changes...how about the question to apply the patch to other branches? |
Don't worry about the other branches now, we can figure it out if we want to backport it. |
Test build #23908 has finished for PR 2524 at commit
|
Test PASSed. |
Alright, thanks! I've merged this in. |
… accmulator https://issues.apache.org/jira/browse/SPARK-3628 In current implementation, the accumulator will be updated for every successfully finished task, even the task is from a resubmitted stage, which makes the accumulator counter-intuitive In this patch, I changed the way for the DAGScheduler to update the accumulator, DAGScheduler maintains a HashTable, mapping the stage id to the received <accumulator_id , value> pairs. Only when the stage becomes independent, (no job needs it any more), we accumulate the values of the <accumulator_id , value> pairs, when a task finished, we check if the HashTable has contained such stageId, it saves the accumulator_id, value only when the task is the first finished task of a new stage or the stage is running for the first attempt... Author: CodingCat <zhunansjtu@gmail.com> Closes #2524 from CodingCat/SPARK-732-1 and squashes the following commits: 701a1e8 [CodingCat] roll back change on Accumulator.scala 1433e6f [CodingCat] make MIMA happy b233737 [CodingCat] address Matei's comments 02261b8 [CodingCat] rollback some changes 6b0aff9 [CodingCat] update document 2b2e8cf [CodingCat] updateAccumulator 83b75f8 [CodingCat] style fix 84570d2 [CodingCat] re-enable the bad accumulator guard 1e9e14d [CodingCat] add NPE guard 21b6840 [CodingCat] simplify the patch 88d1f03 [CodingCat] fix rebase error f74266b [CodingCat] add test case for resubmitted result stage 5cf586f [CodingCat] de-duplicate on task level 138f9b3 [CodingCat] make MIMA happy 67593d2 [CodingCat] make if allowing duplicate update as an option of accumulator (cherry picked from commit 5af53ad) Signed-off-by: Matei Zaharia <matei@databricks.com>
@mateiz @CodingCat Apologies, but can I confirm that the scope of this change is strictly to ensure that actions/result stages never duplicate accumulator updates? The PR title and description are more general than this, but the associated JIRAs suggest the restricted scope. |
yes, originally, I tried to do it for both shuffletask and resultask, later, @mateiz convinced me that we actually cannot handle transformation case so the current change only involves result task, apologize for not changing the PR title on time |
Yes, it should be only SPARK-3628. |
https://issues.apache.org/jira/browse/SPARK-3628
In current implementation, the accumulator will be updated for every successfully finished task, even the task is from a resubmitted stage, which makes the accumulator counter-intuitive
In this patch, I changed the way for the DAGScheduler to update the accumulator,
DAGScheduler maintains a HashTable, mapping the stage id to the received <accumulator_id , value> pairs. Only when the stage becomes independent, (no job needs it any more), we accumulate the values of the <accumulator_id , value> pairs, when a task finished, we check if the HashTable has contained such stageId, it saves the accumulator_id, value only when the task is the first finished task of a new stage or the stage is running for the first attempt...